热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

都会|下游_RxJava线程切换原理

篇首语:本文由编程笔记#小编为大家整理,主要介绍了RxJava线程切换原理相关的知识,希望对你有一定的参考价值。推荐推荐几篇在学习Rxjav

篇首语:本文由编程笔记#小编为大家整理,主要介绍了RxJava 线程切换原理相关的知识,希望对你有一定的参考价值。



推荐

推荐几篇在学习Rxjava中的阅读的文章。尤其是大神W_BinaryTree的文章,给学习过程中带来了不少启发。


  1. 什么是函数响应式编程(Java&android版本)
    函数响应式编程介绍
  2. Rxjava2.0 较全的Api介绍和使用,可以当作开发手册
    Rxjava2.0 Api介绍和使用
  3. Rxjava2.0 和1.0的主要区别
    Rxjava2.0 和1.0的主要区别
  4. Rxjava github官方地址
    Github地址
  5. RxJava背压策略的原理
    背压策略
    背压的理解
  6. Rxjava实战系列
    Android RxJava 实际应用讲解:(无条件)网络请求轮询
    Android RxJava 实际应用讲解:(有条件)网络请求轮询
    Android RxJava 实际应用讲解:网络请求嵌套回调
    Android RxJava 实际应用讲解:合并数据源
    Android RxJava 实际应用讲解:从磁盘 / 内存缓存中 获取缓存数据
    Android RxJava 实际应用讲解:联合判断
    Android RxJava:细说 线程控制(切换 / 调度 )(含Retrofit实例讲解)
    Android RxJava 实际应用讲解:网络请求出错重连(结合Retrofit)
  7. Rxjava 深度教程,对Rxjava讲解的比较透彻
    放弃RxBus,拥抱RxJava
    Rxjava的推理过程
    大神关于实际运用过程中的一些看法
  8. Rxjava中设计到的Monad概念理解
    函数式编程有一个重要概念,叫做Monad
    理解 Monad,一份 monad 的解惑指南
    什么是 Monad (Functional Programming)?
    Functors, Applicatives, And Monads In Pictures

基础概念


观察者模式


  1. 观察者概念

观察者模式(Observer Mode)是定义对象间的一对多的依赖关系,当被观察者的状态发生改变时,所有依赖于它的对象都得到通知并自动刷新。
在观察者模式中有以下四个主要角色:
抽象主题[抽象被观察者](Subject):定义添加和删除观察者的方法,内部通过集合维护观察者序列。
具体主题[具体被观察者](Concrete Subject):抽象主题的实现对象,在具体主题内部状态发生变化时,通知所有的观察者更新状态。
抽象观察者(Observer):定义观察者的统一接口和方法。
具体观察者(Concrete Observer):抽象观察者的具体实现类,实现抽象观察者定义的统一接口,以便使本身的状态与主题状态协调。
经典的观察者模式UML类图:

2. 观察者模式在Rxjava中的运用

为了方便分析问题,下面给出Rxjava实现的最简单的被观察者(主题)发送数据观察者打印数据的代码。从代码中分析Rxjava中是如何定义并且实现观察者模式中不同的角色。为了方便说明问题,把Rxjava中的链式(Chain)拆分成最基本的3段。
(1)Observable对象创建,抽象类Observable是接口ObservableSource下的一个抽象实现,通过Observable创建一个可观察对象发射事件流。
(2)Observer对象创建,创建一个观察者Observer来接受并响应可观察对象发射的事件。
(3)Observer订阅Observable,通过subscribe方法,使Observer与Observable建立订阅关系,Observer与Observable便成为了一个整体,Observer便可对Observable中的行为作出响应。
PS: 虽然从代码上看上去像是Observable订阅了Observer,但是其实还是观察者订阅了被观察者,Rxjava这么设计是为了保持链式调用(Chain)。
这里问了说明问题,没有采用极简的代码实现。
java实现:

private void rxjavaDemo()
Observable observable = Observable.create(new ObservableOnSubscribe()
@Override
public void subscribe(ObservableEmitter e) throws Exception
e.onNext("R");
e.onNext("X");
e.onComplete();

);
Observer observer = new Observer()
@Override
public void onSubscribe(Disposable d)

@Override
public void onNext(Object s)
Log.e(RxjavaDemoActivity.class.getSimpleName(), "object : " + s);

@Override
public void onError(Throwable e)

@Override
public void onComplete()

;
observable.subscribe(observer);

Kotlin实现:

private fun rxjavaDemo()
val mObservable = Observable.create
it.onNext("R")
it.onNext("X")
it.onComplete()

val mObserver = object : Observer
override fun onComplete()

override fun onSubscribe(d: Disposable?)

override fun onNext(value: String?)
Log.e(RxjavaDemoActivity::class.java.simpleName, "object : $value")

override fun onError(e: Throwable?)


mObservable.subscribe(mObserver)


  1. 被观察者(Observable)

在上面代码中我们调用了Observable的create方法来创建被观察者。
(1)在Observable类内部提供了众多的静态方法来创建被观察者。诸如:create、just、interval、from、zip、contact、merge等方法。
(2)requireNonNull方法,是Rxjava的判空实现,防止出现空指针异常。
(3)在create方法中会创建ObservableCreate的被观察者。

@SchedulerSupport(SchedulerSupport.NONE)
public static Observable create(ObservableOnSubscribe source)
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate(source));

被观察者ObservableCreate类继承自抽象类Observable,内部实现了父类的subscribeActual方法。

public final class ObservableCreate extends Observable
..........
@Override
protected void subscribeActual(Observer observer)
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try
source.subscribe(parent);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
parent.onError(ex);


.................

被观察者的抽象类Observable,Observable又是接口ObservableSource下的一个抽象实现。
(1)内部实现了ObservableSource接口定义的subscribe方法。subscribe方法内部主要是调用了subscribeActual方法,所有断定订阅关系是在subscribeActual方法内部实现的。
(2)所以要实现订阅关系,观察者真正需要复写的是subscribeActual方法。比如ObservableCreate类就复写了该方法。

public abstract class Observable implements ObservableSource
..........
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer observer)
ObjectHelper.requireNonNull(observer, "observer is null");
try
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
catch (NullPointerException e) // NOPMD
throw e;
catch (Throwable e)
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;


..............

Observable实现了ObservableSource接口,在ObservableSource内部定义了subscribe方法用来实现订阅观察者(Observer)。

public interface ObservableSource
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if @code observer is null
*/
void subscribe(Observer observer);

总结:
(1)ObservableSource就是扮演着抽象被观察者的角色。
(2)在ObservableSource 接口中定义了subscribe方法用来用来实现订阅观察者(Observer)。
(3)Observable类实现了ObservableSource接口并且实现了其subscribe方法,但是它并没有真正的去完成主题和观察者之间的订阅关系,而是内部调用了另一个抽象方法subscribeActual。
(4)在Observable内部提供了一系列创建型操作符, 用来创建不同场景的Observable。

经过上面的介绍,我们已经明白了在Rxjava中被观察者(Observable)是如何创建的,以及是谁扮演者抽象观察者的角色。但是我们并没有在ObservableCreate类中发现具体发送事件的实现。那么这里就有一个问题:

问题: ObservableCreate等内部是如何发送事件到观察者(Observer)的?


  1. 观察者(Observer)

通过上面的代码和分析,我们知道Observer扮演着抽象观察者的角色。下面分别解释一下Observer类内部定义的四个主要的方法:
(1)onSubscribe(Disposable d)里面的Disposable对象,Disposable翻译过来是可随意使用的。相当于观察者和被观察者之间的订阅关系,如果观察者不想订阅被观察者了,可以调用 mDisposable.dispose()取消订阅关系。
(2)onCompleted(): 事件队列完成。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
(3)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
(4)onNext():接收数据。
(5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。而且onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

public interface Observer
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within @link #onNext(Object)) and asynchronous manner.
* @param d the Disposable instance whose @link Disposable#dispose() can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(Disposable d);
/**
* Provides the Observer with a new item to observe.
*


* The @link Observable may call this method 0 or more times.
*


* The @code Observable will not call this method again after it calls either @link #onComplete or
* @link #onError.
*
* @param value
* the item emitted by the Observable
*/
void onNext(T value);
/**
* Notifies the Observer that the @link Observable has experienced an error condition.
*


* If the @link Observable calls this method, it will not thereafter call @link #onNext or
* @link #onComplete.
*
* @param e
* the exception encountered by the Observable
*/
void onError(Throwable e);
/**
* Notifies the Observer that the @link Observable has finished sending push-based notifications.
*


* The @link Observable will not call this method if it calls @link #onError.
*/
void onComplete();

既然Observer是个接口,那么就应该是个抽象观察者,具体的观察者是我们在实际运用的时候直接new一个实例对象。

经过上面的介绍,我们已经明白了在Rxjava中观察者是如何创建的以及各个方法的作用。那么观察者是如何接受被观察者发送的事件的呢?

问题: Observer是如何接受数据到被观察者发送的数据?


订阅

前提:观察者订阅被观察者后,被观察者才会开始发送事件。
PS:但是并不是所有的被观察者都需要被订阅才会发送数据,比如Observable.just的方法返回的ObservableJust被观察者者。
示例:执行下面的代码会输出JustObservable,但是将just换成create方法就不会输出,所有得出结论:并不是所有的被观察者都需要被订阅才会发送数据

Observable.just(new JustObservable());
class JustObservable
JustObservable()
Log.e("rx","JustObservable");

下面我们来分析上面遗留的两个问题:即观察者和被观察者是如何发送和接受事件的

Observable.create生成的被观察者需要被订阅后才会发送数据到观察者。
根据上面的分析我们知道:这里的observable对象是ObservableCreate类的实例。

observable.subscribe(observer);

这里的subscribe是父类Observable的方法,在里面又会调用subscribeActual方法,Observable的子类ObservableCreate会复写subscribeActual方法。

public final void subscribe(Observer observer)
ObjectHelper.requireNonNull(observer, "observer is null");
try
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
catch (NullPointerException e) // NOPMD
throw e;
catch (Throwable e)
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;


下面来分析一下ObservableCreate类。
(1)在ObservableCreate的构造方法中有个ObservableOnSubscribe类型的形参。
(2)并且正如我们上面所说内部实现了subscribeActual方法。
(3)所以真正处理被观察者和观察者之间实现订阅的逻辑在Observable的subscribeActual方法中。

public final class ObservableCreate extends Observable
final ObservableOnSubscribe source;
public ObservableCreate(ObservableOnSubscribe source)
this.source = source;

@Override
protected void subscribeActual(Observer observer)
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try
source.subscribe(parent);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
parent.onError(ex);


.............

那么在ObservableCreate的构造方法的形参的赋值肯定是在ObservableCreate对象初始化的时候,然而ObservableCreate的初始化,是通过Observable的create方法,下面我们回到Observable的create的方法。

public static Observable create(ObservableOnSubscribe source)
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate(source));

在调用create方法的时候需要传递ObservableOnSubscribe的对象作为参数,而这个对象最终会传入到ObservableCreate的构造方法中。
下面来看一下ObservableOnSubscribe的作用:
(1)内部声明了一个subscribe方法,subscribe 接收到一个ObservableEmitter对象。

public interface ObservableOnSubscribe
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(ObservableEmitter e) throws Exception;

下面再来看一下ObservableEmitter类的作用:
(1)ObservableEmitter以一种可以安全取消的形式发送事件到观察者,通过调用setDisposable方法。
(2)继承了Emitter接口。

public interface ObservableEmitter extends Emitter
/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param d the disposable, null is allowed
*/
void setDisposable(Disposable d);
/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(Cancellable c);
/**
* Returns true if the downstream disposed the sequence.
* @return true if the downstream disposed the sequence
*/
boolean isDisposed();
/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized ObservableEmitter
*/
ObservableEmitter serialize();

下面我们来看一下Emitter类的作用:
(1)Emitter翻译过来就是发射器的意识,到这里我们可以想到,该类内部应该定义了一些跟事件发送相关的方法。
(2)并且在实例化ObservableOnSubscribe的时候,我们正好是调用了ObservableEmitter的onNext向观察者发送数据的。

Observable observable = Observable.create(new ObservableOnSubscribe()
@Override
public void subscribe(ObservableEmitter e) throws Exception
e.onNext("R");
e.onNext("X");
e.onComplete();

);

public interface Emitter
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(Throwable error);
/**
* Signal a completion.
*/
void onComplete();

由此我们可以知道Emitter内部声明了三种事件类型,而ObservableEmitter 扩展了Emiiter的功能,添加了Disposable相关的方法,可以用来安全取消事件的发送即取消观察者和被观察者之间的订阅关系。
由上诉分析我们已经知道了ObservableCreate的ObservableOnSubscribe变量的来历和基本作用以及被观察者的创建过程。
下面继续回到ObservableCreate类的subscribeActual方法来看看事件是如何从被观察者发送到观察者的。
(1)在subscribeActual方法内部创建了CreateEmitter类对象,并且接受Observer作为参数,CreateEmitter实现了ObservableEmitter接口。所以该类是负责事件发送,到这里我们已经明确了事件的发送类即ObservableEmitter。
(2)在subscribeActual方法内部,调用了ObservableOnSubscribe的subscribe方法并且传递ObservableEmitter对象实例作为参数,然后就可以调用ObservableEmitter的方法发送事件了。
(3)在subscribeActual方法内部,调用了Observer的onSubscribe方法并且传递CreateEmitter作为参数,这样观察者就持有了发送事件(被观察者)的直接引用,方便观察者取消订阅关系。
到这里我们已经确定到了:观察者是如何和被观察者订阅的。以及事件是如何发送到被观察者的。并且确认了只有发生了订阅关系,事件才可以发送。

下面在来看一下CreateEmitter类的实现逻辑。

(1)CreateEmitter的构造函数,传递一个Observer的对象作为形参。这样就可以将事件发送到对应的观察者了。
(2)CreateEmitter实现了ObservableEmitter接口,作为事件发送器。
(3)onNext事件中,不可以发送参数为null的类型,在事件序列没有中断的情况下把事件从被观察者传递给观察者。
(4)onComplete事件,用于通知观察者事件队列已经没有事件发送了。
(5)onError 事件,事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
(6)setDisposable、setCancellable方法,观察者根据获取到的Emitter的实例对象,可以取消被观察者和观察者之间的订阅关系。

static final class CreateEmitter
extends AtomicReference
implements ObservableEmitter, Disposable
private static final long serialVersionUID = -3434801548987643227L;
final Observer observer;
CreateEmitter(Observer observer)
this.observer = observer;

@Override
public void onNext(T t)
if (t == null)
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;

if (!isDisposed())
observer.onNext(t);


@Override
public void onError(Throwable t)
if (t == null)
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");

if (!isDisposed())
try
observer.onError(t);
finally
dispose();

else
RxJavaPlugins.onError(t);


@Override
public void onComplete()
if (!isDisposed())
try
observer.onComplete();
finally
dispose();



@Override
public void setDisposable(Disposable d)
DisposableHelper.set(this, d);

@Override
public void setCancellable(Cancellable c)
setDisposable(new CancellableDisposable(c));

@Override
public ObservableEmitter serialize()
return new SerializedEmitter(this);

@Override
public void dispose()
DisposableHelper.dispose(this);

@Override
public boolean isDisposed()
return DisposableHelper.isDisposed(get());


小结:
(1)在RxJava中Observer通过onSubscribe方法获取了发送事件中的Disposable对象,这样他就可以控制观察者和被观察者之间的订阅关系。
(2)被观察者并没有直接控制事件的发送,而是将事件的发送给Disposable对象的发送。
(3)订阅关系并没有发生在subscribe方法中,而是在subscribeActual方法中实现了订阅关系。


简单的线程切换

下面的这段代码实现了最简单的Rxjava线程切换。发送事件就可以在非UI线程(RxNewThreadScheduler 的线程执行,将耗时操作放在子线程中,避免阻塞UI线程。接受事件又切换回了Android UI线程,Android禁止在非UI线程操作UI。这样就简单的实现了在自线程处理耗时操作然后在UI线程刷新UI的逻辑。

为了说明问题,把代码拆分成五段,可以看出,其实每次的链式调用都会生成不同的Observable对象,所以我们在平时开发的时候,应该尽可能避免长链式的调用,规避掉不需要的中间操作。

private void rxjavaDemo()
Observable observable = Observable.create(new ObservableOnSubscribe()
@Override
public void subscribe(ObservableEmitter e) throws Exception
Log.e("rxjava","ObservableEmitter current thread :"+ Thread.currentThread().getName());
e.onNext("R");
e.onNext("X");
e.onComplete();

);
Observable observableSubscribeOn = observable.subscribeOn(Schedulers.newThread());
Observer observer = new Observer()
@Override
public void onSubscribe(Disposable d)
Log.e("rxjava","onSubscribe current thread :"+ Thread.currentThread().getName());

@Override
public void onNext(Object s)
Log.e("rxjava","onNext current thread :"+ Thread.currentThread().getName());
Log.e(RxjavaDemoActivity.class.getSimpleName(), "object : " + s);

@Override
public void onError(Throwable e)

@Override
public void onComplete()

;
Observable observableObserveOn = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());

observableObserveOn.subscribe(observer);


subscribeOn

下面我们将结合上面的分析和代码,分析一下在observable.subscribeOn内部都做了那些操作。
(1)实例化了ObservableSubscribeOn对象。并且传入的两个参数分别是ObservableCreate对象和Scheduler对象。
(2)ObservableCreate对象是我们上面分析的Observable.create生成的一个被观察者。
(3)Scheduler对象,现在猜测应该是和线程调度有关的类。接下来会分析到。

public final Observable subscribeOn(Scheduler scheduler)
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));

下面在来分析ObservableSubscribeOn类。

public final class ObservableSubscribeOn extends AbstractObservableWithUpstream
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler)
super(source);
this.scheduler = scheduler;

@Override
public void subscribeActual(final Observer s)
final SubscribeOnObserver parent = new SubscribeOnObserver(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable()
@Override
public void run()
source.subscribe(parent);

));

..........

(1)继承了AbstractObservableWithUpstream类。AbstractObservableWithUpstream类内部存储了ObservableSource类对象,根据上下文,这里的source就是ObservableCreate类对象。
(2)实现了subscribeActual方法,并且在在subscribeActual方法内部创建了SubscribeOnObserver对象,这里的SubscribeOnObserver其实也是个观察者,可以理解成事件经过SubscribeOnObserver观察者中转了才最终到达我们创建的观察者。SubscribeOnObserver 是AtomicReference的子类(保证原子性),实现了 Observer接口 和 Disposable 接口。
(3)内部调用onSubscribe方法将SubscribeOnObserver传递给观察者,这样观察者就可以控制事件的接受了,即获取了事件发送(被观察者发送数据)的控制权。
(3)ObservableSubscribeOn和ObservableCreate一样,也是Observable的一个子类。并且在ObservableSubscribeOn内部持有它上一步的被观察者Observable的引用(这里就是ObservableCreate)。
(4) source.subscribe(parent)实现了观察者和被观察者之间的订阅关系,并将通过SubscribeOnObserver类对象传递给ObservableOnSubscribe的subscribe方法。SubscribeOnObserver类对象就可以实现事件的发送了。
下面在来看一下SubscribeOnObserver类:作用和代码基本同于CreateEmitter类,看上面的CreateEmitter类分析即可。

static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable
private static final long serialVersionUID = 8094547886072529208L;
final Observer actual;
final AtomicReference s;
SubscribeOnObserver(Observer actual)
this.actual = actual;
this.s = new AtomicReference();

@Override
public void onSubscribe(Disposable s)
DisposableHelper.setOnce(this.s, s);

@Override
public void onNext(T t)
actual.onNext(t);

@Override
public void onError(Throwable t)
actual.onError(t);

@Override
public void onComplete()
actual.onComplete();

@Override
public void dispose()
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);

@Override
public boolean isDisposed()
return DisposableHelper.isDisposed(get());

void setDisposable(Disposable d)
DisposableHelper.setOnce(this, d);


经过上面的分析,我们明确了observable.subscribeOn(Schedulers.newThread())创建的被观察者(Observable)和观察者(Observer)之间的订阅关系。下面来分析一下subscribeOn是如何实现线程切换的。首先我们思考一下,如果要实现线程切换,肯定要创建子线程。
问题: 子线程是如何创建
在调用subscribeOn的时候,传入了Scheduler参数,Scheduler翻译过来就是调度者的意识。通过调用Schedulers的newThread方法,创建子线程(RxNewThreadScheduler)。下面我们以RxNewThreadScheduler线程为例。看看线程是如何被创建的。

Observable observableSubscribeOn = observable.subscribeOn(Schedulers.newThread());

在Schedulers类内部定义了newThread静态方法用于生成Scheduler对象。
(1)其中NEW_THREAD为默认的生成的一个Scheduler对象。


static final Scheduler NEW_THREAD;
public static Scheduler newThread()
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);

static
..........
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable()
@Override
public Scheduler call() throws Exception
return NewThreadHolder.DEFAULT;

);

接下来我们就来看看,initNewThreadScheduler() 是如何生成一个Scheduler实例的。
(1)在initNewThreadScheduler方法中经过一系列的条件判断,最终会执行到call方法(延迟初始化)。
(2)NewThreadHolder.DEFAULT会返回一个NewThreadScheduler对象(单例模式)

static final Scheduler DEFAULT = NewThreadScheduler.instance();

下面再来看看单例模式(饿汉式)的NewThreadScheduler类,看名字就可以猜测到是线程调度者。

(1)NewThreadScheduler 继承自Scheduler抽象类。
(2)通过静态代码块中创建了RxThreadFactory线程工厂对象,该类实现了ThreadFactory接口,并且在RxThreadFactory类的newThread方法中创建了优先级为5的线程Thread。
(3)在NewThreadScheduler的createWorker()方法中,创建了NewThreadWorker 对象。

public final class NewThreadScheduler extends Scheduler
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);

public static NewThreadScheduler instance()
return INSTANCE;

private NewThreadScheduler()

@Override
public Worker createWorker()
return new NewThreadWorker(THREAD_FACTORY);

接下来我们就来看看NewThreadWorker 都做了写什么。
(1)在NewThreadWorker的构造函数中,通过调用SchedulerPoolFactory.create的方法并且传入NewThreadScheduler中提供的线程工厂RxThreadFactory创建了一个ScheduledExecutorService对象。

public class NewThreadWorker extends Scheduler.Worker implements Disposable
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory)
executor = SchedulerPoolFactory.create(threadFactory);

.........

在来看一下SchedulerPoolFactory类
(1)通过create方法创建了核心线程数量为1的线程池。

/**
* Creates a ScheduledExecutorService with the given factory.
* @param factory the thread factory
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory)
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor)
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);

return exec;

分析到这里我们明确了,通过Schedulers.newThread()会创建一个核心线程数量为1的线程池。
创建完线程,下面就是启动和运行线程了,并且将事件的发送,放在子线程中进行处理。并且我们都知道调用多次 subscribeOn 指定子线程只有第一次会生效 ,下面我们将带着这两个疑问,来分析一下。
在ObservableSubscribeOn的subscribeActual方法中,通过source.subscribe(parent)调用实现了观察者和被观察者之间的订阅关系。我们可以看到该方法的执行是放在了Runnable里面执行的。所以线程的切换,应该就发生子此处。
(1)经过上面的分析,这里的scheduler对象是NewThreadScheduler类。并且调用了Schedule的scheduleDirect方法。

parent.setDisposable(scheduler.scheduleDirect(new Runnable()
@Override
public void run()
source.subscribe(parent);

));

下面来看看Scheduler类的scheduleDirect方法。
(1)内部调用了重载的scheduleDirect方法。
(2)createWorker返回的是NewThreadWorker类对象。并且调用了NewThreadWorker类的schedule方法。

public Disposable scheduleDirect(Runnable run)
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);

public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit)
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable()
@Override
public void run()
try
decoratedRun.run();
finally
w.dispose();


, delay, unit);
return w;

下面来看看NewThreadWorker类的schedule方法。
(1)在schedulerActual方法中,通过ScheduledExecutorService执行submit或schedule执行一个Runnable任务,即开启了线程池里面的线程任务。

@Override
public Disposable schedule(final Runnable run)
return schedule(run, 0, null);

@Override
public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit)
if (disposed)
return EmptyDisposable.INSTANCE;

return scheduleActual(action, delayTime, unit, null);

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent)
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null)
if (!parent.add(sr))
return sr;


Future f;
try
if (delayTime <&#61; 0)
f &#61; executor.submit((Callable)sr);
else
f &#61; executor.schedule((Callable)sr, delayTime, unit);

sr.setFuture(f);
catch (RejectedExecutionException ex)
parent.remove(sr);
RxJavaPlugins.onError(ex);

return sr;

分析到这里我们知道了线程的开启是在NewThreadWorker类中进行的。

那么还有个疑问&#xff1a;调用多次 subscribeOn 指定子线程只有第一次会生效
&#xff08;1&#xff09;这里的生效并不是指其他的subscribeOn方法创建的线程没有生效&#xff0c;而是会被第一次的subscribeOn创建的线程“掩盖掉”
&#xff08;2&#xff09;多次调用subscribeOn&#xff0c;会生成若干个Observable对象&#xff0c;每个新生成的对象都有切换线程的能力&#xff0c;但是只有第一次的subscribeOn才生效&#xff0c;因为后续的线程切换被第一个“掩盖掉”了。
这么说可能有点抽象&#xff0c;下面以一张图来说明:
&#xff08;1&#xff09;每次调用subscribeOn方法&#xff0c;都会生成一个Observable&#xff0c;并且回持有上游的Observable对象。
&#xff08;2&#xff09;事件的发送是在第一次的subscribeOn创建的子线程中发送的&#xff0c;中间不会切换线程。


observeOn

通过上面subscribeOn的流程梳理&#xff0c;我们知道了上游事件是被如何切换都子线程的。下面我们将分析事件是如何被切换到下游线程的&#xff0c;大部分情况下就是我们的Android UI线程。
下面我们将结合上面的分析和代码&#xff0c;分析一下在observable.observeOn内部都做了那些操作。

observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())

下面来看一下Observable的observeOn方法&#xff1a;
&#xff08;1&#xff09; observeOn 方法返回了一个 ObservableObserveOn类对象。

public final Observable observeOn(Scheduler scheduler)
return observeOn(scheduler, false, bufferSize());

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));

接下来看看 ObservableObserveOn类。

(1)该类ObservableSubscribeOn基本一致&#xff0c;继承了 AbstractObservableWithUpstream &#xff0c;拥有ObservableSource类型对象&#xff0c;这里是ObservableSubscribeOn实例对象。
(2)在subscribeActual 方法内部&#xff0c;scheduler是HandlerScheduler类型对象&#xff0c;这里的scheduler就是展开分析了&#xff0c;基本上就是利用Android的Handler机制实现线程切换的。
(3)通过 scheduler.createWorker() 创建了 HandlerWorker的Worker对象。
(4)创建了一个ObserveOnObserver对象&#xff0c;该 类实现了Observer 接口&#xff0c;所有它是个Observer&#xff0c;同时实现了一个Runnable接口&#xff0c;这样通过Handler就可以执行到ObserveOnObserver的run方法。

public final class ObservableObserveOn extends AbstractObservableWithUpstream
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize)
super(source);
this.scheduler &#61; scheduler;
this.delayError &#61; delayError;
this.bufferSize &#61; bufferSize;

&#64;Override
protected void subscribeActual(Observer observer)
if (scheduler instanceof TrampolineScheduler)
source.subscribe(observer);
else
Scheduler.Worker w &#61; scheduler.createWorker();
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));


...........

下面就来看看这个 ObserveOnObserver&#xff0c;通过上面我们知道&#xff0c;线程切换是通过Handler实现的。
(1)actual 参数是我们创建的Observer对象。
(2)Worker参数是HandlerWorker对象。通过AndroidSchedulers.mainThread()的调用是创建的。

static final class ObserveOnObserver extends BasicIntQueueDisposable
implements Observer, Runnable
private static final long serialVersionUID &#61; 6576896619930983584L;
final Observer actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer actual, Scheduler.Worker worker, boolean delayError, int bufferSize)
this.actual &#61; actual;
this.worker &#61; worker;
this.delayError &#61; delayError;
this.bufferSize &#61; bufferSize;


下面来看一下run方法里面的操作&#xff1a;
outputFused参数默认是false&#xff0c;所以接下来看看drainNormal方法。

&#64;Override
public void run()
if (outputFused)
drainFused();
else
drainNormal();


&#xff08;1&#xff09;queue参数是在onSubscribe方法里面创建的。而onSubscribe方法的调用&#xff0c;则是在上游的subscribeActual方法中调用的。
&#xff08;2&#xff09;内部通过轮训队列里面的事件&#xff0c;将事件最终发送到Observer。

void drainNormal()
int missed &#61; 1;
final SimpleQueue q &#61; queue;
final Observer a &#61; actual;
for (;;)
if (checkTerminated(done, q.isEmpty(), a))
return;

for (;;)
boolean d &#61; done;
T v;
try
v &#61; q.poll();
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
boolean empty &#61; v &#61;&#61; null;
if (checkTerminated(d, empty, a))
return;
if (empty)
break;
a.onNext(v);

missed &#61; addAndGet(-missed);
if (missed &#61;&#61; 0)
break;



下面以一张图&#xff0c;总结一下&#xff1a;observeOn和subscribeOn的流程。


总结

(1)subscribeOn 控制上游线程切换&#xff0c;subscribeOn多次调用只有第一次的subscribeOn会起作用。
(2)observeOn控制下游线程切换。observeOn可以使用多次。并且observeOn 后面的所有操作都会在observeOn指定的线程中执行。
(3)subscribeOn和observeOn之间的操作&#xff0c;会在subscribeOn 指定的线程中执行&#xff0c;直到执行了observeOn操作。


推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 湍流|低频_youcans 的 OpenCV 例程 200 篇106. 退化图像的逆滤波
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了youcans的OpenCV例程200篇106.退化图像的逆滤波相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文主要解析了Open judge C16H问题中涉及到的Magical Balls的快速幂和逆元算法,并给出了问题的解析和解决方法。详细介绍了问题的背景和规则,并给出了相应的算法解析和实现步骤。通过本文的解析,读者可以更好地理解和解决Open judge C16H问题中的Magical Balls部分。 ... [详细]
  • 本文介绍了UVALive6575题目Odd and Even Zeroes的解法,使用了数位dp和找规律的方法。阶乘的定义和性质被介绍,并给出了一些例子。其中,部分阶乘的尾零个数为奇数,部分为偶数。 ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
  • 点击上方“新机器视觉”,选择加”星标”或“置顶”重磅干货,第一时间送达很早就想总结一下前段时间学习HALCON的心得,但由于其他的事情总是抽不出时间。去年有过一段时间的集中学习,做 ... [详细]
  • 欧拉回路是指不令笔离开纸面,可画过图中每条边仅一次,且可以回到起点的一条回路。现给定一个图,问是否存在欧拉回路?Input测 ... [详细]
  • Problemexplanation: ... [详细]
author-avatar
手机用户2602913361
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有